/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.client.impl;

import com.chinamobile.cmss.lakehouse.common.exception.BaseException;
import com.chinamobile.cmss.lakehouse.common.kubernetes.K8sModelConstant;
import com.chinamobile.cmss.lakehouse.common.kubernetes.NodeResource;
import com.chinamobile.cmss.lakehouse.core.client.IKubernetesClient;
import com.chinamobile.cmss.lakehouse.core.config.KubeConfig;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.gson.JsonSyntaxException;
import io.kubernetes.client.PodLogs;
import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.custom.QuantityFormatter;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.BatchV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.ExtensionsV1beta1Api;
import io.kubernetes.client.openapi.apis.RbacAuthorizationV1Api;
import io.kubernetes.client.openapi.models.V1ClusterRoleBinding;
import io.kubernetes.client.openapi.models.V1ClusterRoleBindingList;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1ConfigMapList;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1DeploymentList;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1NamespaceBuilder;
import io.kubernetes.client.openapi.models.V1NamespaceList;
import io.kubernetes.client.openapi.models.V1Node;
import io.kubernetes.client.openapi.models.V1NodeList;
import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1StatefulSetList;
import io.kubernetes.client.openapi.models.V1Status;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.PatchUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@VisibleForTesting
@Component
public class KubernetesClientImpl implements IKubernetesClient {

    private AppsV1Api appsV1Api;
    private CoreV1Api coreV1Api;
    private BatchV1Api batchV1Api;
    private ApiClient apiClient;
    private ExtensionsV1beta1Api extensionsV1beta1Api;
    private PodLogs podLogs;
    private RbacAuthorizationV1Api rbacAuthorizationV1Api;

    /**
     * User token required to access the remote k8s cluster, which can be generated as below:
     * kubectl create serviceaccount lakehouse-admin
     * kubectl create clusterrolebinding lakehouse-admin-clusterrolebinding --clusterrole=cluster-admin --serviceaccount=default:lakehouse-admin
     * kubectl get $(kubectl get secret -o name | grep lakehouse-admin) -o jsonpath='{.data.token}' | base64 --decode | xargs echo
     */
    @Autowired
    public KubernetesClientImpl(KubeConfig kubeConfig) {
        apiClient = Config.fromToken(kubeConfig.getApiServer(), kubeConfig.getAccessToken(), false);
        Configuration.setDefaultApiClient(apiClient);
        appsV1Api = new AppsV1Api();
        coreV1Api = new CoreV1Api();
        batchV1Api = new BatchV1Api();
        podLogs = new PodLogs();
        extensionsV1beta1Api = new ExtensionsV1beta1Api();
        rbacAuthorizationV1Api = new RbacAuthorizationV1Api();
    }

    public static V1Namespace buildNamespace(String namespaceName) {
        return new V1NamespaceBuilder().withApiVersion(K8sModelConstant.API_VERSION)
            .withKind(K8sModelConstant.NAMESPACE_KIND).withMetadata(new V1ObjectMetaBuilder()
                .withName(namespaceName).withLabels(K8sModelConstant.NAMESPACE_LABELS).build())
            .build();
    }

    @Override
    public V1NamespaceList listNamespace(String labelSelector) throws ApiException {
        return coreV1Api.listNamespace(null, null, null, null, labelSelector, null, null, null, null);
    }

    @Override
    public V1ConfigMapList listConfigMap(String namespace) throws ApiException {
        return coreV1Api.listNamespacedConfigMap(namespace, null, null, null, null, null, null, null,
            null, null);
    }

    @Override
    public V1ServiceList listService(final String namespace) throws ApiException {
        return coreV1Api.listNamespacedService(namespace, null, null, null, null, null, null, null,
            null, null);
    }

    @Override
    public V1StatefulSetList listStatefulSet(String namespace) throws ApiException {
        return appsV1Api.listNamespacedStatefulSet(namespace, null, null, null, null, null, null, null,
            null, null);
    }

    @Override
    public V1DeploymentList listDeployment(String namespace) throws ApiException {
        return appsV1Api.listNamespacedDeployment(namespace, null, null, null, null, null, null, null,
            null, null);
    }

    @Override
    public V1PodList listPodForAllNamespaces() throws ApiException {
        return coreV1Api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null);
    }

    @Override
    public V1Namespace createNamespace(final String namespace) throws ApiException {
        V1Namespace ns = buildNamespace(namespace);
        log(ns.getMetadata().getNamespace(), ns.getKind(), ns);
        V1Namespace v1Namespace = coreV1Api.createNamespace(ns, null, null, null);
        return v1Namespace;
    }

    @Override
    public V1ClusterRoleBinding createClusterRoleBinding(V1ClusterRoleBinding clusterRoleBinding)
        throws ApiException {
        return rbacAuthorizationV1Api.createClusterRoleBinding(clusterRoleBinding, null, null, null);
    }

    /**
     * ApiException: Conflict
     *
     * @param namespace
     * @param configMap
     * @return
     * @throws ApiException
     */
    @Override
    public V1ConfigMap createConfigMap(final String namespace, final V1ConfigMap configMap)
        throws ApiException {
        log(namespace, configMap.getKind(), configMap);
        return coreV1Api.createNamespacedConfigMap(namespace, configMap, null, null, null);
    }

    @Override
    public V1Service createService(final String namespace, final V1Service service)
        throws ApiException {
        log(namespace, service.getKind(), service);
        return coreV1Api.createNamespacedService(namespace, service, null, null, null);
    }

    @Override
    public V1StatefulSet createStatefulSet(final String namespace, final V1StatefulSet statefulSet)
        throws ApiException {
        log(namespace, statefulSet.getKind(), statefulSet);
        return appsV1Api.createNamespacedStatefulSet(namespace, statefulSet, null, null, null);
    }

    @Override
    public V1Deployment createDeployment(final String namespace, final V1Deployment deployment)
        throws ApiException {
        log(namespace, deployment.getKind(), deployment);
        return appsV1Api.createNamespacedDeployment(namespace, deployment, null, null, null);
    }

    @Override
    public V1Deployment patchDeployment(String deploymentName, String namespace, V1Patch v1Patch) throws ApiException {
        return appsV1Api.patchNamespacedDeployment(deploymentName, namespace, v1Patch, null, null, null, null);
    }

    /**
     * refer to: https://github.com/kubernetes-client/java/pull/2116
     */
    @Override
    public void patchStatefulSet(String statefulSetName, String namespace, V1Patch body) throws ApiException {
        PatchUtils.patch(
            V1StatefulSet.class,
            () ->
                appsV1Api.patchNamespacedStatefulSetCall(
                    statefulSetName,
                    namespace,
                    body,
                    null,
                    null,
                    null,
                    null,
                    null),
            V1Patch.PATCH_FORMAT_STRATEGIC_MERGE_PATCH,
            apiClient);
    }

    @Override
    public V1ConfigMap replaceConfigMap(String name, String namespace, V1ConfigMap configMap)
        throws ApiException {
        try {
            return coreV1Api.replaceNamespacedConfigMap(name, namespace, configMap, null, null, null);
        } catch (JsonSyntaxException ex) {
            log.warn(
                "This is a known internal error, just ignore this warning and return the modified V1ConfigMap");
            return configMap;
        }
    }

    @Override
    public Optional<V1ConfigMap> readConfigMap(String name, String namespace) {
        try {
            V1ConfigMap v1ConfigMap =
                coreV1Api.readNamespacedConfigMap(name, namespace, null, null, null);
            return Optional.of(v1ConfigMap);
        } catch (ApiException apiException) {
            if (apiException.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
                return Optional.empty();
            } else {
                throw new BaseException(apiException);
            }
        }
    }

    @Override
    public V1Service replaceService(String name, String namespace, V1Service service)
        throws ApiException {
        return coreV1Api.replaceNamespacedService(name, namespace, service, null, null, null);
    }

    @Override
    public V1StatefulSet replaceStatefulSet(String name, String namespace, V1StatefulSet statefulSet)
        throws ApiException {
        return appsV1Api.replaceNamespacedStatefulSet(name, namespace, statefulSet, null, null, null);
    }

    @Override
    public V1Deployment replaceDeployment(String name, String namespace, V1Deployment deployment)
        throws ApiException {
        return appsV1Api.replaceNamespacedDeployment(name, namespace, deployment, null, null, null);
    }

    /**
     * It is an known kubernetes bug, refer to https://github.com/kubernetes/kubernetes/issues/51163.
     * <p>
     * A workaround for https://github.com/kubernetes-client/java/issues/86.
     *
     * @param namespace
     * @param options
     * @return Expected a status with code 200 and successful message for this case
     * @throws ApiException
     */
    @Override
    public V1Status deleteNamespace(final String namespace, final V1DeleteOptions options)
        throws ApiException {
        try {
            log(namespace, K8sModelConstant.NAMESPACE_KIND, namespace, options);
            return coreV1Api.deleteNamespace(namespace, null, null, null, null, null, options);
        } catch (JsonSyntaxException ex) {
            return resumeDeleteV1StatusExceptionally(ex);
        }
    }

    @Override
    public V1Status deleteClusterRoleBinding(String crbName, V1DeleteOptions options)
        throws ApiException {
        return rbacAuthorizationV1Api.deleteClusterRoleBinding(crbName, null, null, null, null, null,
            options);
    }

    @Override
    public V1Status deleteConfigMap(final String namespace, final String configMapName,
                                    final V1DeleteOptions options) throws ApiException {
        log(namespace, K8sModelConstant.CONFIG_MAP_KIND, configMapName, options);
        return coreV1Api.deleteNamespacedConfigMap(configMapName, namespace, null, null, null, null,
            null, options);
    }

    @Override
    public V1Status deleteService(final String namespace, final String serviceName,
                                  final V1DeleteOptions options) throws ApiException {
        log(namespace, K8sModelConstant.SERVICE_KIND, serviceName, options);
        return coreV1Api.deleteNamespacedService(serviceName, namespace, null, null, null, null, null,
            options);
    }

    @Override
    public V1Status deleteStatefulSet(final String namespace, final String statefulSetName,
                                      V1DeleteOptions options) throws ApiException {
        log(namespace, K8sModelConstant.STATEFUL_SET_KIND, statefulSetName, options);
        return appsV1Api.deleteNamespacedStatefulSet(statefulSetName, namespace, null, null, null, null,
            null, options);
    }

    @Override
    public V1Status deleteDeployment(final String namespace, final String deploymentName,
                                     V1DeleteOptions options) throws ApiException {
        log(namespace, K8sModelConstant.DEPLOYMENT_KIND, deploymentName, options);
        return appsV1Api.deleteNamespacedDeployment(deploymentName, namespace, null, null, null, null,
            null, options);
    }

    @Override
    public boolean existNamespace(final String namespace) throws ApiException {
        V1NamespaceList v1NamespaceList =
            coreV1Api.listNamespace(null, null, null, null, null, null, null, null, null);
        return v1NamespaceList.getItems().stream()
            .anyMatch(ns -> namespace.equals(ns.getMetadata().getName()));
    }

    @Override
    public boolean existClusterRoleBinding(String clusterRoleBinding) throws ApiException {
        V1ClusterRoleBindingList v1ClusterRoleBindingList = rbacAuthorizationV1Api
            .listClusterRoleBinding(null, null, null, null, null, null, null, null, null);
        return v1ClusterRoleBindingList.getItems().stream()
            .anyMatch(rcb -> clusterRoleBinding.equals(rcb.getMetadata().getName()));
    }

    @Override
    public boolean existConfigMap(final String namespace, final String configMapName)
        throws ApiException {
        V1ConfigMapList configMapList = coreV1Api.listNamespacedConfigMap(namespace, null, null, null,
            null, null, null, null, null, null);
        return configMapList.getItems().stream()
            .anyMatch(configMap -> configMapName.equals(configMap.getMetadata().getName()));
    }

    @Override
    public boolean existService(final String namespace, final String service) throws ApiException {
        V1ServiceList v1ServiceList = coreV1Api.listNamespacedService(namespace, null, null, null, null,
            null, null, null, null, null);
        return v1ServiceList.getItems().stream()
            .anyMatch(svc -> service.equals(svc.getMetadata().getName()));
    }

    @Override
    public boolean existStatefulSet(final String namespace, final String statefulSetName)
        throws ApiException {
        V1StatefulSetList v1StatefulSetList = appsV1Api.listNamespacedStatefulSet(namespace, null, null,
            null, null, null, null, null, null, null);
        return v1StatefulSetList.getItems().stream()
            .anyMatch(sfs -> statefulSetName.equals(sfs.getMetadata().getName()));
    }

    @Override
    public boolean existDeployment(final String namespace, final String deploymentName)
        throws ApiException {
        V1DeploymentList v1DeploymentList = appsV1Api.listNamespacedDeployment(namespace, null, null,
            null, null, null, null, null, null, null);
        return v1DeploymentList.getItems().stream()
            .anyMatch(sfs -> deploymentName.equals(sfs.getMetadata().getName()));
    }

    @Override
    public boolean existPod(String namespace, String podName) {
        return readNamespacedPod(namespace, podName).isPresent();
    }

    @Override
    public Optional<V1StatefulSet> readNamespacedStatefulSet(String namespace,
                                                             String statefulSetName) {
        try {
            V1StatefulSet v1StatefulSet =
                appsV1Api.readNamespacedStatefulSet(statefulSetName, namespace, null, null, null);
            return Optional.of(v1StatefulSet);
        } catch (ApiException apiException) {
            if (apiException.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
                return Optional.empty();
            } else {
                throw new BaseException(apiException);
            }
        }
    }

    @Override
    public Optional<V1Deployment> readNamespacedDeployment(String namespace, String deploymentName) {
        try {
            V1Deployment v1Deployment =
                appsV1Api.readNamespacedDeployment(deploymentName, namespace, null, null, null);
            return Optional.of(v1Deployment);
        } catch (ApiException apiException) {
            if (apiException.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
                return Optional.empty();
            } else {
                throw new BaseException(apiException);
            }
        }
    }

    @Override
    public Optional<V1Pod> readNamespacedPod(String namespace, String podName) {
        try {
            V1Pod v1Pod = coreV1Api.readNamespacedPod(podName, namespace, null, null, null);
            return Optional.of(v1Pod);
        } catch (ApiException apiException) {
            if (apiException.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
                return Optional.empty();
            } else {
                throw new BaseException(apiException);
            }
        }
    }

    @Override
    public Optional<InputStream> readNamespacedPodLogStream(String namespace, String podName) {
        try {
            return Optional.of(podLogs.streamNamespacedPodLog(namespace, podName, null));
        } catch (ApiException apiException) {
            if (apiException.getCode() == HttpURLConnection.HTTP_NOT_FOUND) {
                return Optional.empty();
            } else {
                throw new BaseException(apiException);
            }
        } catch (IOException ioException) {
            return Optional.empty();
        }
    }

    @Override
    public ApiClient getApiClient() {
        return this.apiClient;
    }

    @Override
    public V1NodeList listNode(String labelSelector) throws ApiException {
        return coreV1Api.listNode(null, null, null, null, labelSelector, null, null, null, null);
    }

    @Override
    public List<NodeResource> getNodeResources(String labelSelector) throws ApiException {
        List<NodeResource> nodeResourceList = Lists.newArrayList();
        List<V1Node> nodeList = listNode(labelSelector).getItems();
        // list pods without selector label
        List<V1Pod> podList = listPodForAllNamespaces().getItems().stream()
            .filter(pod -> K8sModelConstant.POD_PHASE.contains(pod.getStatus().getPhase()))
            .collect(Collectors.toList());
        for (V1Node node : nodeList) {
            NodeResource nodeResource = new NodeResource();
            nodeResource.setV1Node(node);
            // calc Allocatable
            Map<String, Quantity> allocatable = node.getStatus().getAllocatable();
            nodeResource.setCpuAllocatable(allocatable.get(K8sModelConstant.NODE_CPU).getNumber());
            nodeResource.setMemoryAllocatable(allocatable.get(K8sModelConstant.NODE_MEMORY).getNumber());

            BigDecimal cpuRequests = BigDecimal.ZERO;
            BigDecimal cpuLimits = BigDecimal.ZERO;
            BigDecimal memoryRequests = BigDecimal.ZERO;
            BigDecimal memoryLimits = BigDecimal.ZERO;

            // filter node name matched pod list
            List<V1Pod> matchedPodList = podList.stream()
                .filter(p -> p.getSpec().getNodeName() != null
                    && p.getSpec().getNodeName().equals(node.getMetadata().getName()))
                .collect(Collectors.toList());

            // calc Requests and Limits
            for (V1Pod v1Pod : matchedPodList) {
                List<V1Container> containerList = v1Pod.getSpec().getContainers();
                for (V1Container v1Container : containerList) {
                    Map<String, Quantity> requests = v1Container.getResources().getRequests();
                    Map<String, Quantity> limits = v1Container.getResources().getLimits();
                    // skip pods that do not have request or limits setting
                    if (requests != null && limits != null) {
                        cpuRequests = cpuRequests.add(
                            requests.getOrDefault(K8sModelConstant.NODE_CPU, buildZeroQuantity()).getNumber());
                        cpuLimits = cpuLimits.add(
                            limits.getOrDefault(K8sModelConstant.NODE_CPU, buildZeroQuantity()).getNumber());
                        memoryRequests = memoryRequests.add(
                            requests.getOrDefault(K8sModelConstant.NODE_MEMORY, buildZeroQuantity()).getNumber());
                        memoryLimits = memoryLimits.add(
                            limits.getOrDefault(K8sModelConstant.NODE_MEMORY, buildZeroQuantity()).getNumber());
                    }
                }
                nodeResource.setCpuRequests(cpuRequests);
                nodeResource.setCpuLimits(cpuLimits);
                nodeResource.setMemoryRequests(memoryRequests);
                nodeResource.setMemoryLimits(memoryLimits);
            }

            nodeResourceList.add(nodeResource);
        }
        return nodeResourceList;
    }

    private Quantity buildZeroQuantity() {
        return new QuantityFormatter().parse("0.0");
    }

    /**
     * A workaround for https://github.com/kubernetes-client/java/issues/86. For now catching
     * exceptions manually and return a status with code 200 and successful message.
     *
     * @param ex json syntax exception
     */
    private V1Status resumeDeleteV1StatusExceptionally(JsonSyntaxException ex) {
        if (ex.getCause() instanceof IllegalStateException) {
            IllegalStateException ise = (IllegalStateException) ex.getCause();
            if (ise.getMessage() != null
                && ise.getMessage().contains("Expected a string but was BEGIN_OBJECT")) {
                // skip this as expected
                return new V1Status().kind("Status").code(200).message("successful");
            } else {
                throw ex;
            }
        } else {
            throw ex;
        }
    }

    private void log(final String namespace, final String kind, final Object k8sObject) {
        String k8sObjectStr = appsV1Api.getApiClient().getJSON().serialize(k8sObject);

        if (K8sModelConstant.NAMESPACE_KIND.equals(kind)) {
            log.info("create {}:{}", kind, k8sObjectStr);
        } else {
            log.info("in Namespace:{} create {}:{} ", namespace, kind, k8sObjectStr);
        }

    }

    private void log(final String namespace, final String kind, final String objectName,
                     final V1DeleteOptions options) {
        String deleteOptionsStr = appsV1Api.getApiClient().getJSON().serialize(options);
        if (K8sModelConstant.NAMESPACE_KIND.equals(kind)) {
            log.info("delete {}:{} deleteOptions:{}", kind, objectName, deleteOptionsStr);
        } else {
            log.info("in Namespace:{} delete {}:{} deleteOptions:{} ", namespace, kind, objectName,
                deleteOptionsStr);
        }

    }
}
